package com.fwmagic.dynamic_rule.functions;

import com.fwmagic.dynamic_rule.bean.LogBean;
import com.fwmagic.dynamic_rule.bean.ResultBean;
import com.fwmagic.dynamic_rule.bean.RuleParam;
import com.fwmagic.dynamic_rule.service.UserActionCountQueryService;
import com.fwmagic.dynamic_rule.service.UserActionSequenceQueryService;
import com.fwmagic.dynamic_rule.service.UserProfileQueryService;
import com.fwmagic.dynamic_rule.service.impl.UserActionCountQueryServiceStateImpl;
import com.fwmagic.dynamic_rule.service.impl.UserActionSequenceQueryServiceStateImpl;
import com.fwmagic.dynamic_rule.service.impl.UserProfileQueryServiceHbaseImpl;
import com.fwmagic.dynamic_rule.utils.RuleSimulator;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;

/**
 * 规则处理核心逻辑
 */
public class RuleProcessFunction extends KeyedProcessFunction<String, LogBean, ResultBean> {

    private ListState<LogBean> listState;

    private UserProfileQueryService userProfileQueryService;

    private UserActionCountQueryService userActionCountQueryService;

    private UserActionSequenceQueryService userActionSequenceQueryService;

    @Override
    public void open(Configuration parameters) throws Exception {
        //准备一个底层明细事件的State
        ListStateDescriptor<LogBean> stateDescriptor = new ListStateDescriptor<>("", LogBean.class);
        listState = getRuntimeContext().getListState(stateDescriptor);

        //构造底层核心的查询服务
        userProfileQueryService = new UserProfileQueryServiceHbaseImpl();
        userActionCountQueryService = new UserActionCountQueryServiceStateImpl(listState);
        userActionSequenceQueryService = new UserActionSequenceQueryServiceStateImpl(listState);
    }

    @Override
    public void processElement(LogBean logBean, Context context, Collector<ResultBean> collector) throws Exception {
        //将收到的事件放入历史明细state存储中
        listState.add(logBean);

        //获取规则参数
        RuleParam ruleParam = RuleSimulator.getRuleParam();

        //1.判断是否满足事件触发条件
        if (ruleParam.getTriggerParam().getEventId().equals(logBean.getEventId())) {
            //2.查询画像条件
            boolean userProfileMatch = userProfileQueryService.judgeProfileCondition(logBean.getDeviceId(), ruleParam);
            if (!userProfileMatch) return;

            //3.查询用户行为次数条件
            boolean userActionCountMatch = userActionCountQueryService.queryActionCounts("", ruleParam);
            if (!userActionCountMatch) return;

            //4.查询用户行为事件序列条件
            boolean userActionSequenceMatch = userActionSequenceQueryService.queryActionSequence("", ruleParam);
            if (!userActionSequenceMatch) return;

            ResultBean resultBean = new ResultBean();
            resultBean.setTimeStamp(logBean.getTimeStamp());
            resultBean.setDeviceId(logBean.getDeviceId());
            resultBean.setRuleId(ruleParam.getRuleId());
            //收集如何规则匹配成功的结果
            collector.collect(resultBean);
        }
    }
}
